package rpc.provider.common.manager;

import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rpc.constants.RpcConstants;
import rpc.protocol.RpcProtocol;
import rpc.protocol.header.RpcHeader;
import rpc.protocol.header.RpcHeaderFactory;
import rpc.protocol.response.RpcResponse;
import rpc.provider.common.cache.ProviderChannelCache;
import rpc.protocol.enumeration.RpcType;

import java.util.Set;

/**
 * @version 1.0.0
 * @description 服务提供者连接管理器
 */
public class ProviderConnectionManager {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProviderConnectionManager.class);
    /**
     * 扫描并移除不活跃的连接
     */
    public static void scanNotActiveChannel(){
         Set<Channel> channelCache = ProviderChannelCache.getChannelCache();
        if (channelCache == null || channelCache.isEmpty()) return;
        channelCache.stream().forEach((channel) -> {
            if (!channel.isOpen() || !channel.isActive()){
                channel.close();
                ProviderChannelCache.remove(channel);
            }
        });
    }

    /**
     * 发送ping消息
     */
//    public static void broadcastPingMessageFromProvider(){
//        Set<Channel> channelCache = ProviderChannelCache.getChannelCache();
//        if (channelCache == null || channelCache.isEmpty()) return;
//        RpcHeader header = RpcHeaderFactory.getRequestHeader(RpcConstants.SERIALIZATION_PROTOSTUFF, RpcType.HEARTBEAT_FROM_PROVIDER.getType());
//        RpcProtocol<RpcResponse> responseRpcProtocol = new RpcProtocol<RpcResponse>();
//        RpcResponse rpcResponse = new RpcResponse();
//        rpcResponse.setResult(RpcConstants.HEARTBEAT_PING);
//        responseRpcProtocol.setHeader(header);
//        responseRpcProtocol.setBody(rpcResponse);
//          channelCache.stream().forEach((channel) -> {
//            if (channel.isOpen() && channel.isActive()){
//                LOGGER.info("send heartbeat message to service consumer, the consumer is: {}, the heartbeat message is: {}", channel.remoteAddress(), rpcResponse.getResult());
//                channel.writeAndFlush(responseRpcProtocol);
//            }
//        });
//    }
    public static void broadcastPingMessageFromProvider(){
        Set<Channel> channelCache = ProviderChannelCache.getChannelCache();
        if (channelCache == null || channelCache.isEmpty()){
            System.out.println("为什么我一直是空的");
            return;
        }
        RpcHeader header = RpcHeaderFactory.getRequestHeader(RpcConstants.SERIALIZATION_PROTOSTUFF, RpcType.HEARTBEAT_FROM_PROVIDER.getType());
//        RpcHeader header = RpcHeaderFactory.getRequestHeader(RpcConstants.SERIALIZATION_FST, RpcType.HEARTBEAT_FROM_PROVIDER.getType());
        RpcProtocol<RpcResponse> responseRpcProtocol = new RpcProtocol<RpcResponse>();
        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setResult(RpcConstants.HEARTBEAT_PING);
        responseRpcProtocol.setHeader(header);
        responseRpcProtocol.setBody(rpcResponse);
        System.out.println("俺老孙到ProviderConnectionManager一游");
        channelCache.stream().forEach((channel) -> {
            if (channel.isOpen() && channel.isActive()){
                LOGGER.info("send heartbeat message to service consumer, the consumer is: {}, the heartbeat message is: {}", channel.remoteAddress(), rpcResponse.getResult());
                channel.writeAndFlush(responseRpcProtocol);
            }
        });
    }
}
